package fun.easycode.datastream;

import fun.easycode.datastream.util.RetryUtil;
import fun.easycode.jointblock.util.ExceptionUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 全量数据处理消费者
 *
 * @author xuzhen97
 */
@Slf4j
public class DataCompleteConsumer<I> implements Callable<List<String>>, DataCompleteProducerFinishNotify {

    private final BlockingQueue<List<I>> queue;
    private final IDataProcessor<I> processor;

    /**
     * 是否完成, 代表流式查询是否完成
     * 如果完成，则消费者线程处理完成数据之后可以退出
     */
    private boolean isFinish = false;

    public DataCompleteConsumer(BlockingQueue<List<I>> queue, IDataProcessor<I> processor) {
        this.queue = queue;
        this.processor = processor;
    }

    @Override
    public void finish() {
        isFinish = true;
    }

    @Override
    public List<String> call() throws Exception {

        List<String> errorMsgList = new ArrayList<>();

        while (!isFinish) {
            List<I> data = queue.poll();
            if (data != null) {
                // 记录错误信息，只记录最后一次错误信息
                AtomicReference<String> errorMsg = new AtomicReference<>();
                boolean r = RetryUtil.getRetryTemplate().execute(context -> {
                    try {
                        processor.process(data);
                    }catch (Exception e){
                        e.printStackTrace();
                        errorMsg.set(ExceptionUtil.getStackTrace(e));
                        throw e;
                    }
                    return true;
                }, context -> {
                    // 重试失败
                    return false;
                });
                if(!r){
                    log.error("全量数据处理失败，错误信息：{}", errorMsg.get());
                    // 错误信息添加到错误信息列表中
                    errorMsgList.add(errorMsg.get());
                }
            }
        }
        return errorMsgList;
    }
}
